package com.uyuni.rpc.transport;

import com.uyuni.rpc.common.exception.remoting.RemotingSendRequestException;
import com.uyuni.rpc.common.exception.remoting.RemotingTimeoutException;
import com.uyuni.rpc.common.protocol.UyuniProtocol;
import com.uyuni.rpc.common.utils.Pair;
import com.uyuni.rpc.transport.model.NettyChannelInactiveProcessor;
import com.uyuni.rpc.transport.model.NettyRequestProcessor;
import com.uyuni.rpc.transport.model.RemotingResponse;
import com.uyuni.rpc.transport.model.RemotingTransporter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static com.uyuni.rpc.common.protocol.UyuniProtocol.REQUEST_REMOTING;
import static com.uyuni.rpc.common.protocol.UyuniProtocol.RESPONSE_REMOTING;


/**
 * @author zhanghailin
 * @description netty C/S 端的客户端提取，子类去完全netty的一些创建的事情，该抽象类则取完成使用子类创建好的channel去与远程端交互
 */
public abstract class NettyRemotingBase {

    private static final Logger logger = LoggerFactory.getLogger(NettyRemotingBase.class);

    /******key为请求的opaque value是远程返回的结果封装类******/
    protected final ConcurrentHashMap<Long, RemotingResponse> responseTable = new ConcurrentHashMap<>(256);

    //如果使用者没有对创建的Netty网络段注入某个特定请求的处理器的时候，默认使用该默认的处理器
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

    //注入的某个requestCode对应的处理器放入到HashMap中，键值对一一匹配
    protected final HashMap<Byte/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
            new HashMap<>(64);

    //netty网络段channelInactive事件发生的处理器
    protected Pair<NettyChannelInactiveProcessor, ExecutorService> defaultChannelInactiveProcessor;

    protected final ExecutorService publicExecutor = Executors.newFixedThreadPool(4, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    public RemotingTransporter invokeSyncImpl(final Channel channel, final RemotingTransporter request, final long timeoutMillis) throws RemotingTimeoutException, RemotingSendRequestException, InterruptedException {
        try {
            //构造一个response的封装体，请求Id和请求结果一一对应
            final RemotingResponse remotingResponse = new RemotingResponse(request.getOpaque(), timeoutMillis, null);
            //将请求放入一个"篮子"中，等远程端填充该篮子中嗷嗷待哺的每一个结果集
            this.responseTable.put(request.getOpaque(), remotingResponse);
            channel.writeAndFlush(request).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    //如果发送对象成功，则设置成功 直接返回
                    remotingResponse.setSendRequestOK(true);
                    return;
                } else {
                    remotingResponse.setSendRequestOK(false);
                }
                //如果请求发送直接失败，则默认将其从responseTable这个篮子中移除
                responseTable.remove(request.getOpaque());
                //失败的异常信息
                remotingResponse.setCause(future.cause());
                //设置当前请求的返回主体返回体是null(请求失败的情况下，返回的结果肯定是null)
                remotingResponse.putResponse(null);
                logger.warn("use channel [{}] send msg [{}] failed and failed reason is [{}]", channel, request, future.cause().getMessage());
            });
            //阻塞方法，直至有结果返回或者超时
            RemotingTransporter remotingTransporter = remotingResponse.waitResponse();
            if (null == remotingTransporter) {
                //如果发送是成功的，则说明远程端，处理超时了 此处有个疑问
                if (remotingResponse.getSendRequestOK()) {
                    throw new RemotingTimeoutException(parseChannelRemoteAddr(channel),
                            timeoutMillis, remotingResponse.getCause());
                } else {
                    throw new RemotingSendRequestException(parseChannelRemoteAddr(channel),
                            remotingResponse.getCause());
                }
            }
            return remotingTransporter;
        } finally {
            responseTable.remove(request.getOpaque());
        }

    }

    //ChannelRead0方法对应的具体实现
    protected void processMessageReceived(ChannelHandlerContext ctx, RemotingTransporter msg) {

        if (logger.isDebugEnabled()) {
            logger.debug("channel [] received RemotingTransporter is [{}]", ctx.channel(), msg);
        }

        final RemotingTransporter remotingTransporter = msg;

        if (remotingTransporter != null) {
            switch (remotingTransporter.getTransporterType()) {
                //作为server端 client端的请求的对应的处理
                case REQUEST_REMOTING:
                    processRemotingRequest(ctx, remotingTransporter);
                    break;
                //作为客户端，来自server端的响应的处理
                case RESPONSE_REMOTING:
                    processRemotingResponse(ctx, remotingTransporter);
                    break;
                default:
                    break;
            }
        }
    }

    protected void processChannelInactive(final ChannelHandlerContext ctx) {
        final Pair<NettyChannelInactiveProcessor, ExecutorService> pair = this.defaultChannelInactiveProcessor;
        if(pair != null){
            Runnable run = () -> {
                try {
                    pair.getKey().processChannelInactive(ctx);
                } catch (RemotingSendRequestException | RemotingTimeoutException | InterruptedException e) {
                    logger.error("server occor exception [{}]",e.getMessage());
                }
            };
            try {
                pair.getValue().submit(run);
            } catch (Exception e) {
                logger.error("server is busy,[{}]",e.getMessage());
            }
        }
    }

    protected void processRemotingRequest(final ChannelHandlerContext ctx, final RemotingTransporter remotingTransporter) {

        final Pair<NettyRequestProcessor, ExecutorService> matchedPair = this.processorTable.get(remotingTransporter.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair =
                null == matchedPair ? this.defaultRequestProcessor : matchedPair;
        if (pair != null) {

            Runnable run = () -> {
                try {
                    RPCHook rpcHook = NettyRemotingBase.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(parseChannelRemoteAddr(ctx.channel()), remotingTransporter);
                    }
                    final RemotingTransporter response = pair.getKey().processRequest(ctx, remotingTransporter);
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(parseChannelRemoteAddr(ctx.channel()),
                                remotingTransporter, response);
                    }
                    if (null != response) {
                        ctx.writeAndFlush(response).addListener((ChannelFutureListener) future -> {
                            if (!future.isSuccess()) {
                                logger.error("fail send response ,exception is [{}]", future.cause().getMessage());
                            }
                        });
                    }
                } catch (Exception e) {
                    logger.error("processor occur exception [{}]", e.getMessage());
                    final RemotingTransporter response = RemotingTransporter.newInstance(remotingTransporter.getOpaque(), UyuniProtocol.RESPONSE_REMOTING, UyuniProtocol.HANDLER_ERROR, null);
                    ctx.writeAndFlush(response);
                }
            };
            try {
                pair.getValue().submit(run);
            } catch (Exception e) {
                logger.error("server is busy,[{}]", e.getMessage());
                final RemotingTransporter response = RemotingTransporter.newInstance(remotingTransporter.getOpaque(), UyuniProtocol.RESPONSE_REMOTING, UyuniProtocol.HANDLER_BUSY, null);
                ctx.writeAndFlush(response);
            }
        }
    }

    protected abstract RPCHook getRPCHook();

    /**
     * client处理server端返回的消息的处理
     *
     * @param ctx
     * @param remotingTransporter
     */
    protected void processRemotingResponse(ChannelHandlerContext ctx, RemotingTransporter remotingTransporter) {
        //从缓存篮子里拿出对应请求的对应响应的载体RemotingResponse
        final RemotingResponse remotingResponse = responseTable.get(remotingTransporter.getOpaque());
        //不超时的情况下
        if (null != remotingResponse) {
            //首先先设值，这样会在countdownlatch wait之前把值赋上
            remotingResponse.setRemotingTransporter(remotingTransporter);
            //可以直接countdown
            remotingResponse.putResponse(remotingTransporter);
            //从篮子中移除
            responseTable.remove(remotingTransporter.getOpaque());
        } else {
            logger.warn("received response but matched Id is removed from responseTable maybe timeout");
            logger.warn(remotingTransporter.toString());
        }
    }

    private String parseChannelRemoteAddr(final Channel channel) {
        if (null == channel) {
            return "";
        }
        final SocketAddress remote = channel.remoteAddress();
        final String addr = remote != null ? remote.toString() : "";

        if (addr.length() > 0) {
            int index = addr.lastIndexOf("/");
            if (index >= 0) {
                return addr.substring(index + 1);
            }

            return addr;
        }

        return "";
    }
}
